SpringBoot + Kafka 使用@KafkaListener注解批量消费 您所在的位置:网站首页 springboot kafka 多个消费者 SpringBoot + Kafka 使用@KafkaListener注解批量消费

SpringBoot + Kafka 使用@KafkaListener注解批量消费

2023-09-07 08:58| 来源: 网络整理| 查看: 265

使用@KafkaListener 注解进行批量消费时,出现如下报错:

Cannot convert from [java.lang.String] to [org.apache.kafka.clients.consumer.ConsumerRecord]

原因是默认没有开启批量监听的,解决办法是设置注解的 containerFactory 属性。

完整代码如下

1)、批量 消费监听工厂类

@Configuration public class KafkaConfiguration { /** * 解决批量消费的问题 * @param properties 配置信息,springboot 从配置文件获取, 自动注入 * @return 批量工厂类 */ @Bean public KafkaListenerContainerFactory batchFactory(KafkaProperties properties) { Map consumerProperties = properties.buildConsumerProperties(); ConcurrentKafkaListenerContainerFactory factory = new ConcurrentKafkaListenerContainerFactory(); factory.setConsumerFactory(new DefaultKafkaConsumerFactory(consumerProperties)); factory.setBatchListener(true); // 开启批量监听 return factory; } }

2)、消费监听

@Component @RequiredArgsConstructor public class GxdcKafkaConsumer { private final GxdcService gxdcService; /** * 共享单车消费Listener, 批量处理加上containerFactory = "batchFactory" * @param records 消息记录对象,此处为批量消费,若单条消费,此处改为ConsumerRecord * @param consumer 消费者对象,可以获取分区、主题等信息,也可进行手动提交操作 */ @KafkaListener(topics = {"${spring.kafka.consumer.topics.ods_dc_count_result}"}, containerFactory = "batchFactory") public void listen(ConsumerRecords records, Consumer consumer) { if (records.isEmpty()) { return; } // 消息逻辑处理 for (ConsumerRecord record : records) { switch (GxdcKeyEnum.getInstance(record.key())) { case DC_POINT: BikePointInfo pointInfo = JSON.parseObject(record.value(), BikePointInfo.class); gxdcService.saveBikePointInfo(pointInfo); break; case DC_ORDER: OrderSummaryInfo summaryInfo = JSON.parseObject(record.value(), OrderSummaryInfo.class); gxdcService.saveOrderSummaryInfo(summaryInfo); break; default: } } } }



【本文地址】

公司简介

联系我们

今日新闻

    推荐新闻

      专题文章
        CopyRight 2018-2019 实验室设备网 版权所有